﻿/**

 * Copyright (c) 2015-2016, FastDev 刘强 (fastdev@163.com) & Quincy.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 *      http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

using OF.DistributeService.Core.Common;
using OF.Notify.DataHost.Cluster;
using OF.Notify.Entity;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;



namespace OF.Notify.Master
{    
    public class TopicCluster
    {        
        internal class ClusterActiveCollection
        {
            internal int activeCollectionId = ClusterContext.InvalidateId;
            internal int activeCollectionMsgCount = 0;
            internal ConcurrentBag<byte> finishedConsumerList = new ConcurrentBag<byte>();
            public ClusterActiveCollection(int activeCollectionId)
            {
                this.activeCollectionId = activeCollectionId;
            }

            public void AddFinishedConsumer(byte consumerEnum)
            {
                finishedConsumerList.Add(consumerEnum);
            }
            public int GetActiveCollectionId()
            {
                return this.activeCollectionId;
            }

            public bool CanDeliverMsg()
            {
                var context = ClusterContext.Get();
                int msgCount = Interlocked.Increment(ref activeCollectionMsgCount);
                if (msgCount <= context.GetMaxMessageCount())
                {
                    return true;
                }
                else
                {
                    return false;
                }
            }
        }

        internal byte topicEnum;
        public int Id;
        internal int clusterCollectionCount = 0;
        protected List<short> nodeIds = null;
        internal ClusterActiveCollection activeCollection = null;

        internal ConsumerInstanceProvider<ConcurrentDictionary<int, short>> onProcessDictProvider = null;
        public int lastActiveCollectionId = ClusterContext.InvalidateId;
        internal bool isDisposed = false;

        internal ConcurrentDictionary<int, short> GetOnProcessDict(byte consumerEnum)
        {
            return onProcessDictProvider.GetInstance(consumerEnum);
        }

        public bool IsCollectionOnInstantProcess(byte consumerEnum, int collectionId)
        {
            return GetOnProcessDict(consumerEnum).ContainsKey(collectionId);
        }

        public int GetOnInstanceProcessCollectionCount(byte consumerEnum)
        {
            return GetOnProcessDict(consumerEnum).Count;
        }

        public void DisposeNodeOnInstantProcess(short nodeId)
        { 
            var topicContext = ClusterContext.Get().GetTopicContext(topicEnum);
            foreach (var consumerEnum in topicContext.GetAllConsumerEnums())
            {
                var dict = GetOnProcessDict(consumerEnum);
                List<int> collectionToDispose = new List<int>();
                foreach (var kv in dict)
                {
                    if (kv.Value.Equals(nodeId))
                    {
                        collectionToDispose.Add(kv.Key);
                    }
                }
                foreach (int collectionId in collectionToDispose)
                {
                    short tempId = 0;
                    dict.TryRemove(collectionId, out tempId);
                }
            }
        }

        public bool SetCurrentCollectionOnInstantProcess(byte consumerEnum, ref int collectionId)
        {
            lock (this)
            {
                var collection = activeCollection;
                if (collection == null)
                {
                    return false;
                }
                collectionId = collection.GetActiveCollectionId();
                return GetOnProcessDict(consumerEnum).TryAdd(collectionId, ClusterContext.InvalidateId);
            }
        }

        public void SetInstantProcessNodeId(byte consumerEnum, int collectionId, short nodeId)
        {
            var dict = GetOnProcessDict(consumerEnum);
            if (dict.ContainsKey(collectionId))
            {
                dict.AddOrUpdate(collectionId, nodeId, (k, v) => nodeId);
            }
        }

        public void RemoveCollectionOnInstantProcess(byte consumerEnum, int collectionId)
        {
            lock (this)
            {
                short temp = 0;
                var activeCollection = this.activeCollection;
                if (activeCollection != null && activeCollection.activeCollectionId == collectionId)
                {
                    activeCollection.AddFinishedConsumer(consumerEnum);
                }
                else
                {
                    GetOnProcessDict(consumerEnum).TryRemove(collectionId, out temp);
                }
            }
        }

        public TopicCluster(int id, List<short> nodeIds, byte topicEnum)
        {
            this.Id = id;
            this.nodeIds = nodeIds;
            this.topicEnum = topicEnum;

            var context = ClusterContext.Get();
            var topicContext = context.GetTopicContext(topicEnum);
            Dictionary<byte, ConcurrentDictionary<int, short>> dict = new Dictionary<byte, ConcurrentDictionary<int, short>>();
            foreach (var consumerEnum in topicContext.GetAllConsumerEnums())
            {
                dict.Add(consumerEnum, new ConcurrentDictionary<int, short>());
            }
            onProcessDictProvider = new ConsumerInstanceProvider<ConcurrentDictionary<int, short>>(dict);
			//Util.LogInfo("....create cluster:" + id + "," + string.Join(" ", nodeIds));
        }

        public TopicCluster GetCloneCluster(int newId)
        {
            return new TopicCluster(newId, this.nodeIds, this.topicEnum);
        }

        public bool IsClusterCollectionCountValidate()
        {
            if (clusterCollectionCount < ClusterContext.Get().GetMaxClusterCollectionCount())
            {
                return true;
            }
            else
            {
                return false;
            }
        }
        
        private void DisposeOldActiveCollection(ClusterActiveCollection currentActive)
        {
            foreach (var consumerEnum in currentActive.finishedConsumerList)
            {
                short temp = 0;
                GetOnProcessDict(consumerEnum).TryRemove(currentActive.GetActiveCollectionId(), out temp);
            }
        }

        public void SetActiveCollection(int activeCollectionId)
        {
            lock (this)
            {
                var currentActive = this.activeCollection;
                if (currentActive == null)
                {
                    lastActiveCollectionId = ClusterContext.InvalidateId;
                }
                else
                {
                    lastActiveCollectionId = currentActive.GetActiveCollectionId();
                    DisposeOldActiveCollection(currentActive);
                }

                if (activeCollectionId == ClusterContext.InvalidateId)
                {
                    this.activeCollection = null;
                }
                else
                {
                    this.activeCollection = new ClusterActiveCollection(activeCollectionId);
                    this.clusterCollectionCount++;
                }
            }
        }

        public int GetActiveCollectionId()
        {
            if (activeCollection == null)
            {
                return -1;
            }
            return activeCollection.GetActiveCollectionId();
        }

        public bool CanDeliverMsg()
        {
            if (activeCollection == null)
            {
                return false;
            }
            return activeCollection.CanDeliverMsg();
        }

        public List<short> GetClusterNodeIds()
        {
            return nodeIds;
        }

        public KeyValuePair<int, int> DoDispose()
        {
            isDisposed = true;
            int collectionId = GetActiveCollectionId();
            int lastCollectionId = lastActiveCollectionId;
            SetActiveCollection(ClusterContext.InvalidateId);
            return new KeyValuePair<int, int>(collectionId, lastCollectionId);
        }

        public bool GetIsDisposed()
        {
            return isDisposed;
        }
    }
}
